package com.etc

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @Auther: Wangcc
  * @Date: 2018/8/20 18:20
  * @Description: 使用反射方式将RDD转换为DataFrame
  */
object RDD2DataFrameReflectionScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("RDD2DataFrameReflection")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    // 在Scala中使用反射方式，进行RDD到DataFrame的转换，需要手动导入一个隐式转换
    import sqlContext.implicits._

    case class Student(id: Int, name: String, age: Int)

    // 这里其实就是一个普通的，元素为case class的RDD
    // 直接对它使用toDF()方法，即可转换为DataFrame
    val studentDF = sc.textFile("D:\\Spark 2.0\\课程代码\\第74讲-Spark SQL：使用反射方式将RDD转换为DataFrame\\文档\\students.txt", 1)
      .map { line => line.split(",") }
      .map { arr => Student(arr(0).trim().toInt, arr(1), arr(2).trim().toInt) }
      .toDS()

    studentDF.registerTempTable("students")

    val teenagerDF = sqlContext.sql("select * from students where age<=18")

    val teenagerRDD = teenagerDF.rdd

    // 在scala中，row中的数据的顺序，反而是按照我们期望的来排列的，这个跟java是不一样的哦
    teenagerRDD.map { row => Student(row(0).toString().toInt, row(1).toString(), row(2).toString().toInt) }
      .collect()
      .foreach { stu => println(stu.id + ":" + stu.name + ":" + stu.age) }

    // 在scala中，对row的使用，比java中的row的使用，更加丰富
    // 在scala中，可以用row的getAs()方法，获取指定列名的列
    teenagerRDD.map { row => Student(row.getAs[Int]("id"), row.getAs[String]("name"), row.getAs[Int]("age")) }
      .collect()
      .foreach { stu => println(stu.id + ":" + stu.name + ":" + stu.age) }

    // 还可以通过row的getValuesMap()方法，获取指定几列的值，返回的是个map
    val studentRDD = teenagerRDD.map { row => {
      val map = row.getValuesMap[Any](Array("id", "name", "age"));
      Student(map("id").toString().toInt, map("name").toString(), map("age").toString().toInt)
    }
    }
    studentRDD.collect().foreach { stu => println(stu.id + ":" + stu.name + ":" + stu.age) }
  }

}
